/*
Timemachine
Copyright (c) 2006 Technische Universitaet Muenchen,
                   Technische Universitaet Berlin,
                   The Regents of the University of California
All rights reserved.


Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:

1. Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
   notice, this list of conditions and the following disclaimer in the
   documentation and/or other materials provided with the distribution.
3. Neither the names of the copyright owners nor the names of its
   contributors may be used to endorse or promote products derived from
   this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* main.cc
 * $Id: main.cc 251 2009-02-04 08:14:24Z gregor $
 *
 */

#include "config.h"

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <pcap.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <net/ethernet.h>
#include <netinet/in.h>
#include <netinet/in_systm.h>
#include <netinet/ip.h>
//#include <netinet/tcp.h>
//#include <netinet/udp.h>
#include <arpa/inet.h>
#include <signal.h>
#include <fcntl.h>
#include <limits.h>
#include <pthread.h>
#define _REENTRANT
#include <readline/readline.h>
#include <readline/history.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/resource.h>
#ifdef __linux
#include <malloc.h>
#define USE_MALLINFO
#endif
#include <sys/ioctl.h>

#ifdef HAVE_LIBBROCCOLI
#include "BroccoliComm.hh"
#endif

#ifdef HAVE_LIBPCAPNAV
#include <pcapnav.h>
#endif

#include "conf.h"
#include "types.h"

#include "Storage.hh"
#include "Fifo.hh"
#include "LogFile.hh"
#include "Query.hh"
#include "Index.hh"


/***************************************************************************
 * global variables
 */
unsigned had_to_wait = 0;

// FIXME: I want a header file
void *
rmtconsole_listen_thread(void *arg);

char errbuf[PCAP_ERRBUF_SIZE]="";

// unsigned int smallest=UINT_MAX, largest=0;
uint64_t tot_bytes=0, tot_pkt_cnt=0;
uint64_t uncut_bytes=0, uncut_pkt_cnt=0;
// TODO: these values are not thread-safe. Altough since approx. figures areq 
// enough at the moment, we don't lock!!
uint64_t querySentPkts = 0;
uint64_t querySentBytes = 0;
struct timeval stats_last_call_ts;

Storage* storage=NULL;


#define HOSTNAME_MAXLEN 32
char hostname[HOSTNAME_MAXLEN];

LogFile* log_file = NULL;

pthread_t
	rmtconsole_listen_thread_tid,
	cli_console_thread_tid,
	statisticslog_thread_tid,
	index_aggregation_thread_tid,
	main_thread_tid
	;




/* Got some strange effects with signals being received from multiple
 * threads. Hope this mutex helps
 */
//static pthread_mutex_t sigmutex; 


/***************************************************************************
 * configuration parameters
 */

StorageConfig storageConf;
int conf_main_log_interval=60;
const char* conf_main_workdir="./";
const char* conf_main_indexdir="./";
const char* conf_main_queryfiledir="./";
const char* conf_main_logfile_name="tm.log";
const char* conf_main_bro_connect_str=NULL;
int conf_main_console=0;
int conf_main_daemon=0;
int conf_main_tweak_capture_thread = TM_TWEAK_CAPTURE_THREAD_NONE;
unsigned short conf_main_rmtconsole_port = 42042;
struct in_addr conf_main_rmtconsole_listen_addr; // defualt value is set in main!
int conf_main_rmtconsole = 0;
int stderr_is_open = 1;
int conf_main_bro_listen = 1;
int conf_main_bro_listen_port = 47557;
struct in_addr conf_main_bro_listen_addr; // defualt value is set in main!



/***************************************************************************
 * loggins functions
 */
#define MAX_MSG_LEN 1024

static void tmlog_backend(int severity, const char *ident, const char *msg) {
	if (severity == TM_LOG_ERROR && stderr_is_open) {
		fprintf(stderr, "tm: %s: %s\n", ident, msg);
	}
	if (log_file)
		log_file->log(ident, msg);
}

void tmlog(int severity, const char *ident, const char *fmt, ...) {
	va_list ap;
	char msg[MAX_MSG_LEN];

	msg[0] = '\0';
	switch(severity) {
		case TM_LOG_WARN:
			strcpy(msg, "WARNING: ");
			break;
		case TM_LOG_ERROR:
			strcpy(msg, "ERROR: ");
			break;
		default:
			break;
	}
	va_start(ap, fmt);
	vsnprintf(msg+strlen(msg), MAX_MSG_LEN-strlen(msg), fmt, ap);
	va_end(ap);
	tmlog_backend(severity, ident,  msg);
}

void tmlog(const char *ident, const char *fmt, ...) {
	va_list ap;
	char msg[MAX_MSG_LEN];
	va_start(ap, fmt);
	vsnprintf(msg, MAX_MSG_LEN, fmt, ap);
	va_end(ap);
	tmlog_backend(TM_LOG_NOTE, ident,  msg);
}


/***************************************************************************
 * output pcap statistics
 */
void print_stats(FILE *outfp) {
	struct pcap_stat ps;

	storage->getPcapStats(&ps);
	fprintf(outfp, "%u\tpkts received\t[pcap_stats]\n"
		   "%u\tpkts dropped by kernel\t[pcap_stats]\n"
		   "%.2f\t%% dropped/received\t[pcap_stats]\n"
		   "%"PRINTF_UINT64"\tpkts total received by capture application\n"
		   "%"PRINTF_UINT64"\tbytes total\n",
		   ps.ps_recv, ps.ps_drop,
		   (float)ps.ps_drop/ps.ps_recv*100,
		   tot_pkt_cnt, tot_bytes);
	if (stats_last_call_ts.tv_sec) {
		struct timeval now;
		gettimeofday(&now, NULL);
		
		//double dt=now.tv_sec-stats_last_call_ts.tv_sec
		//		  + (now.tv_usec-stats_last_call_ts.tv_usec)/1e6;
		/* ((now.tv_sec-stats_last_call_ts.tv_sec)*1e6
		   +now.tv_usec-stats_last_call_ts.tv_usec)/1e6; */
		/*fprintf(outfp, "%.1f\tMbit/s avg since last stats call (%.1fs ago)\n",
			   (tot_bytes-stats_last_call_tot_bytes)/dt/125000, dt); */
	}

	gettimeofday(&stats_last_call_ts, NULL);

	storage->debugPrint(outfp);

	storage->getConns().printStats(outfp); // number of connection entries

	//  storage->conns.debugPrint(); // top and bottom 5

	fprintf(outfp, "getOldestTimestampMem()=%f\n", storage->getOldestTimestampMem());
	// Formatproblem: printf("tot_num_queries %lu\n", storage->getTotNumQueries());
	// 
	// Valgrind hat gemeckert!
	//printf("avg_queries_duration %f us\n",
		   //(double)storage->getTotQueriesDuration()/
		   //storage->getTotNumQueries());
}





/***************************************************************************
 * end program
 */

void
tmexit() {
	
	if (conf_main_rmtconsole) {
		// Cancel rmtconsole_listen thread 
		tmlog(TM_LOG_DEBUG, "main",  "Canceling rmtconsole_listen thread");
		pthread_cancel(rmtconsole_listen_thread_tid);
		tmlog(TM_LOG_DEBUG, "main",  "Joining rmtconsole_listen thread");
		pthread_join(rmtconsole_listen_thread_tid, NULL);
		tmlog(TM_LOG_DEBUG, "main",  "rmtconsole listen thread i DEAD.");
	}
	
	tmlog(TM_LOG_DEBUG, "main",  "Canceling stats thread");
	pthread_cancel(statisticslog_thread_tid);
	tmlog(TM_LOG_DEBUG, "main",  "Joining stats thread");
	pthread_join(statisticslog_thread_tid, NULL);
	tmlog(TM_LOG_DEBUG, "main",  "stats thread i DEAD.");

	// Cancel aggregation thread 
	tmlog(TM_LOG_DEBUG, "main",  "Canceling aggreagation thread");
	pthread_cancel(index_aggregation_thread_tid);
	tmlog(TM_LOG_DEBUG, "main",  "Joining aggreagation thread");
	pthread_join(index_aggregation_thread_tid, NULL);
	tmlog(TM_LOG_DEBUG, "main",  "Aggreagation thread i DEAD.");

#ifdef HAVE_LIBBROCCOLI
	broccoli_exit();
#endif

	//TODO: Cancel all query threads 
	//XXX: Assumption: when a thread waits for lock it is cancel-able

	// when profiling: exit immediately here
	// exit(0);


	// Deleting the storage object will break the pcap loop and thus
	// the capture thread will quit.
	// Deactivating storage will also destroy all indexes and will 
	// thus cancel all index maintaining threads
	tmlog(TM_LOG_DEBUG, "main", "deactivating storage... ");
	storage->cancelThread();
	delete storage;

	tmlog("main", "Storage deactivated");

	/*
	printf("deactivating log file...");
	LogFile* actual_log_file=log_file;
	log_file=NULL;
	delete actual_log_file;
	*/

	delete log_file;

	cmd_parser_finish();
	exit(0);
	// No need to unlock -- we are DEAD by now
}


/***************************************************************************
 * signal handler 
 */

void sighandler_exit(int sig_num) {
	//FIXME: Calling printf from a signal handler ! Still a problem with thread
	//save libc??
	// do the same for all signals

	//signal(sig_num, SIG_IGN);
	if(pthread_equal(pthread_self(), main_thread_tid)) {
		tmlog(TM_LOG_DEBUG, "sighandler", "exiting due to signal %d...", sig_num);
		//pthread_mutex_lock(&sigmutex);
		tmexit();
	}
	else {
		tmlog(TM_LOG_DEBUG, "sighandler", "exiting due to signal %d. Not the main thread received this signal", sig_num);
		tmexit();
	}
} // catch_int()




/***************************************************************************
 * CLI console thread
 */

void *cli_console_thread(void *arg) {
	/*  char buf[8192]; */
	char *line, *lastline=NULL;
	//  char *opt[2];


	char prompt[64];
	snprintf(prompt, 63, "tm@%s# ", hostname);

	printf("CLI console thread started\n");
	tmlog(TM_LOG_NOTE, "main", "CLI console thread started");

	do {
		line=readline(prompt);
		if (line !=NULL && strlen(line)>0) {
			if (!lastline || strcmp(lastline, line)) 
				add_history(line);
			if (lastline) 
				free(lastline);
			lastline=strdup(line);
			parse_cmd(line, stdout, storage, 0);
			free(line);
		}
	} while (line != NULL);

	printf("CLI console thread exiting\n");
	tmlog(TM_LOG_NOTE, "main", "CLI console thread exiting");

	return(0);
} /* cli_console_thread */

/***************************************************************************
 * index aggregation  thread
 */
void *index_aggregation_thread(void *arg) {
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
	tmlog(TM_LOG_NOTE, "main", "Index Aggregation Thread Started");
	while (1) {
		sleep(2);
		storage->aggregateIndexFiles();
	}
	return NULL;
}



/***************************************************************************
 * statisticslog thread
 * args:
 *  logging interval: int*
 */

void *statisticslog_thread(void* arg) {
	LogFile *stats_log_file;
	LogFile *classes_log_file;
	LogFile *index_log_file;

#define LOG_STRING_SIZE 1000
	struct timeval last_call_ts;
	struct timeval now;
	struct rusage r;
	double last_ru_utime=0;
	double last_ru_stime=0;
	double ru_utime, ru_stime;
	double utime_perc, stime_perc;
	double dt;
	double link_rate, uncut_rate;
	struct pcap_stat ps;
	uint64_t mem_from_os;
	uint64_t mem_alloc;

	char s[LOG_STRING_SIZE];
	unsigned last_drop = 0;

	uint64_t last_call_tot_bytes=0;
	uint64_t last_call_uncut_bytes=0;
	std::list<Fifo*> fifos;
	Indexes *indexes;

	stats_log_file = new LogFile(std::string("stats.") + conf_main_logfile_name);
	classes_log_file = new LogFile(std::string("classes.") + conf_main_logfile_name);
	index_log_file = new LogFile(std::string("indexes.") + conf_main_logfile_name);

	stats_log_file->logPlain(
			"timestamp "
			"conns "
			"dyn_class "
			"pcap_recv "
			"pcap_drop "
			"drop_ratio "
			"d_drop "
			"totPkts "
			"totBytes "
			"link_rate_mbit "
			"uncutPkts "
			"uncutBytes "
			"uncut_rate_mbit "
			"utime "
			"stime "
			"mem_os "
			"mem "
			"queryPkts "
			"queryBytes "
			);
	classes_log_file->logPlain(std::string("timestamp class ") + Fifo::getStatsStrHeader());

	index_log_file->logPlain(
			"timestamp "
			"index "
			"entries_mem "
			"qlen "
			);
	
	memset(&last_call_ts, 0, sizeof(last_call_ts));
	while (log_file) {
		if (storage->getPcapStats(&ps) < 0 ) {
			///TODO:
		}
		gettimeofday(&now, NULL);
		getrusage(RUSAGE_SELF, &r);
#ifdef USE_MALLINFO
		struct mallinfo mi=mallinfo();
		mem_from_os = mi.hblkhd + mi.arena;
		mem_alloc = mi.uordblks;
#else
		mem_from_os = mem_alloc = 0;
#endif
		ru_utime = to_tm_time(&(r.ru_utime));
		ru_stime = to_tm_time(&(r.ru_stime));
		if (last_call_ts.tv_sec) {
			dt=to_tm_time(&now) - to_tm_time(&last_call_ts);


			utime_perc = 100.*(ru_utime-last_ru_utime)/dt;
			stime_perc = 100.*(ru_stime-last_ru_stime)/dt;

			link_rate = (tot_bytes-last_call_tot_bytes)/dt/125000.;
			uncut_rate = (uncut_bytes-last_call_uncut_bytes)/dt/125000.;

		}
		else {
			link_rate = uncut_rate = 0;
			utime_perc = stime_perc = 0.0;

		}
		last_call_ts = now;
		last_call_tot_bytes=tot_bytes;
		last_call_uncut_bytes=uncut_bytes;

		last_ru_utime = ru_utime;
		last_ru_stime = ru_stime;

		snprintf(s, LOG_STRING_SIZE,
				 "%.2lf "
				 "%"PRINTF_UINT64" "
				 "%d " 
				 "%u "
				 "%u " 
				 "%.2lf%% "
				 "%u "
				 "%"PRINTF_UINT64" "
				 "%"PRINTF_UINT64" "
				 "%.2lf "
				 "%"PRINTF_UINT64" "
				 "%"PRINTF_UINT64" "
				 "%.2lf "
				 "%.1lf "
				 "%.1lf "
				 "%"PRINTF_UINT64" "
				 "%"PRINTF_UINT64" "
				 "%"PRINTF_UINT64" "
				 "%"PRINTF_UINT64" ",
				 to_tm_time(&now),
				 storage->getConns().getNumEntries(),
				 storage->getNumDynClasses(),
				 ps.ps_recv, 
				 ps.ps_drop,
				 (double)ps.ps_drop/tot_pkt_cnt*100, // pcap_recv is always 32Bit and will wrap OFTEN
				 ps.ps_drop-last_drop,
				 tot_pkt_cnt,
				 tot_bytes,
				 link_rate,
				 uncut_pkt_cnt,
				 uncut_bytes,
				 uncut_rate,
				 utime_perc,
				 stime_perc,
				 mem_from_os,
				 mem_alloc,
				 querySentPkts,
				 querySentBytes
					 
				);
		stats_log_file->logPlain(s);
		if (last_drop != ps.ps_drop) {
			log_file->log("DROP" , "we dropped packets: %u", ps.ps_drop-last_drop);
			last_drop = ps.ps_drop;
		}
			

		
		// TODO: make fifos stats and index stats consistent
		fifos = storage->getFifos();
		for(std::list<Fifo*>::iterator i=fifos.begin(); i!=fifos.end(); i++) {
			snprintf(s, LOG_STRING_SIZE, "%.2lf %s %s", to_tm_time(&now),  (*i)->getClassname().c_str(), (*i)->getStatsStr().c_str());
			classes_log_file->logPlain(s);
		}

		indexes = storage->getIndexes();
		for (std::list<IndexType*>::iterator i=indexes->begin(); i!=indexes->end(); i++) {
			snprintf(s, LOG_STRING_SIZE, 
					"%.2lf "
					"%s "
					"%"PRINTF_UINT64" "
					"%u ",
					to_tm_time(&now),
					(*i)->getIndexName().c_str(),
					(*i)->getNumEntriesRAM(),
					(*i)->getQlen()
				);
			index_log_file->logPlain(s);
		}


		//log_file->log("stats_indexes", "wait: %u. %s", had_to_wait, storage->getStatsIndexesStr().c_str());

//		log_file->log("stats_queries", "%u query subscriptions",
//					  storage->getConns().getSubscriptions());
#ifdef HAVE_LIBBROCCOLI
//		log_file->log("stats_broccoli", "peak receive queue %d bytes",
//					  broccoli_recv_q_peak);
		broccoli_recv_q_peak=0;
#endif
		// http://www.google.com/search?hl=en&lr=&q=getrusage+proc&btnG=Search

		/*
		if (dt) {
		struct timeval ucpu_dt;
		struct timeval scpu_dt;
		timersub(&r.ru_utime, &last_call_ru_utime, &ucpu_dt);
		timersub(&r.ru_stime, &last_call_ru_stime, &scpu_dt);
		log_file->log("stats_rusage", "%.2f%% CPU (%.2f%% user+%.2f%% sys) %ld MAXRSS",
		100*to_tm_time(&ucpu_dt)/dt+100*to_tm_time(&scpu_dt)/dt,
		100*to_tm_time(&ucpu_dt)/dt,
		100*to_tm_time(&scpu_dt)/dt,
		r.ru_maxrss
		);
		*/
/*		log_file->log("stats_rusage", "%.2f s user + %.2f s sys CPU  %ld MAXRSS[kB]",
					  to_tm_time(&r.ru_utime),
					  to_tm_time(&r.ru_stime),
					  r.ru_maxrss/1024
					 );*/
		//    } // if (dt)

#ifdef USE_MALLINFO
		/*
		log_file->log("stats_mallinfo", "%d arena  %d uordblks",
					  mi.arena, mi.uordblks
					 );
		*/
#endif

		sleep(*(int*)arg);
	}
	// log file was closed
	return NULL;
}




/***************************************************************************
 * usage()
 */

void usage() {
	fprintf(stderr, "usage: tm [-i interface] [-R files_file]( [-r file]) [-f filter] [-c config file]\n");
	exit(1);
}



/***************************************************************************
 * main()
 */

int
main(int argc, char** argv) {
	const char *conffile="tm.conf";

	struct sigaction exit_action;



	int i;
	inet_aton("127.0.0.1", &conf_main_rmtconsole_listen_addr);
	inet_aton("127.0.0.1", &conf_main_bro_listen_addr);

	setvbuf(stdout, NULL, _IONBF, 0);

	main_thread_tid = pthread_self();

	/*******************************************************
	 * read command line arguments
	 */

	int opt;
	while ((opt=getopt(argc, argv, "i:R:r:f:c:h")) != -1) {
		switch(opt) {
		case 'f':
			storageConf.filter.assign(optarg);
			break;
		case 'i':
			storageConf.device.assign(optarg);
			break;
		case 'r':
			storageConf.readtracefile.assign(optarg);
			break;
		case 'R':
			storageConf.tracefiles.assign(optarg);
			break;
		case 'c':
			conffile=strdup(optarg);
			break;
		case 'h':
		default:
			usage();
		}
	}
	argc -= optind;
	argv += optind;



	/***************************************************************************
	 * read and parse config file
	 */
	if (parse_config(conffile, &storageConf)) {
		fprintf(stderr, "config file errors, aborting\n");
		return(1);
	}



	memset(&stats_last_call_ts, 0, sizeof(stats_last_call_ts));

	if (chdir(conf_main_workdir)) {
		fprintf(stderr, "cannot chdir to %s\n", conf_main_workdir);
		return(1);
	}

	log_file=new LogFile(conf_main_logfile_name);
	tmlog("main", "timemachine version %s", PACKAGE_VERSION);

	if (!conf_main_daemon)
		printf("timemachine version %s\n", PACKAGE_VERSION);
	if (conf_main_daemon && conf_main_console) {
		tmlog(TM_LOG_WARN, "main", "Cannot have a console when in daemon mode. Deactivating console.");
		conf_main_console = 0;
	}

	/* I don't trust fork() together with pthreads. So I just fork now. Even if this
	 * implies, that I can't returned decent exit codes to the calling process. But 
	 * on the other hand, neiter does bind 
	 */
	if (conf_main_daemon) {
		pid_t pid;
		tmlog("main", "Forking Daemon");

		pid = fork();
		if (pid < 0) 
			tmlog(TM_LOG_ERROR, "main",  "Could not fork");
		else if (pid != 0)  /* parent */
			exit(0);
		
		/* child context */
		setsid(); 
		fclose(stdin);
		fclose(stdout);
		/* don't close stderr yet. wait until startup is finished */
	}




	// Blocl all signals. This signal mask is then inherited by spawned 
	// threads. After all threads are spanwed we will unblock again. 
	sigset_t signalSet;
	sigset_t oldSignalSet;
	sigemptyset(&signalSet);
	sigaddset(&signalSet, SIGTERM);
	sigaddset(&signalSet, SIGQUIT);
	sigaddset(&signalSet, SIGINT);
	pthread_sigmask (SIG_BLOCK, &signalSet, &oldSignalSet );
	/*******************************************************
	 * start capture
	 */



#ifdef HAVE_LIBPCAPNAV
	pcapnav_init();
#endif

	// Initialise cmd_parser
	cmd_parser_init();

	try {
		storage = new Storage(storageConf);
	} catch (const std::string& str) {
		fprintf(stderr, "%s\n", str.c_str());
		delete log_file;
		exit(1);
	}
	tmlog(TM_LOG_NOTE, "main", "capture started, capture thread");

#ifdef TM_HEAVY_DEBUG
	tmlog(TM_LOG_NOTE, "main", "Compiled with TM_HEAVY_DEBUG");
#endif

	if (!conf_main_daemon) 
		printf("capture started\n");

	i = pthread_create(&index_aggregation_thread_tid, NULL, index_aggregation_thread, NULL);
	if (i) {
		tmlog(TM_LOG_ERROR, "main", "Could not start index aggregation thread.\n");
		exit(1);
	}

	/*******************************************************
	 * trap signals
	 */
	exit_action.sa_handler = sighandler_exit;
	exit_action.sa_flags = 0;
	sigemptyset(&(exit_action.sa_mask));
	sigaddset(&(exit_action.sa_mask), SIGTERM);
	sigaddset(&(exit_action.sa_mask), SIGQUIT);
	sigaddset(&(exit_action.sa_mask), SIGINT);
	sigaction(SIGTERM, &exit_action, NULL);
	sigaction(SIGINT, &exit_action, NULL);
	sigaction(SIGQUIT, &exit_action, NULL);
	// when profiling: trap signal USR1
	// signal(SIGUSR1, catch_int);
	//  signal(SIGSEGV, catch_int);


	if (conf_main_rmtconsole) {
		i=pthread_create(&rmtconsole_listen_thread_tid, NULL, rmtconsole_listen_thread, NULL);
		if (i) {
			tmlog(TM_LOG_ERROR, "main", "Could not start rmtconsole listen  thread.\n");
			exit(1);
		}
	}


	gethostname(hostname, HOSTNAME_MAXLEN);
	hostname[HOSTNAME_MAXLEN-1]=0;

	if (conf_main_console) {
		i=pthread_create(&cli_console_thread_tid, NULL, cli_console_thread, NULL);
		if (i) {
			tmlog(TM_LOG_ERROR, "main", "Could not start console thread.\n");
			exit(1);
		}
	}

#ifdef HAVE_LIBBROCCOLI
	broccoli_init();
#else
   	if ( conf_main_bro_listen )
		tmlog(TM_LOG_WARN, "main", "Broccoli support not compiled in.\n");
#endif

		i=pthread_create(&statisticslog_thread_tid, NULL,
						 statisticslog_thread, &conf_main_log_interval);
		if (i) {
			tmlog(TM_LOG_ERROR, "main", "Could not start statisticslog thread.\n");
			exit(1);
		};

		// All threads have been spwaned now. Re-enable signal delivery
		// Only this thread will now receive signals. 
		pthread_sigmask (SIG_SETMASK, &oldSignalSet, NULL );

		//   struct sched_param param;
		//   int policy;
		//   param.sched_priority=1;

		//   i=pthread_setschedparam(cli_console_thread_tid, SCHED_FIFO, &param);
		//   if (i) {
		//     perror("thread setschedparam");
		//     exit(-1);
		//   }
		//   i=pthread_getschedparam(cli_console_thread_tid, &policy, &param);
		//   if (i) {
		//     perror("thread getschedparam");
		//     exit(-1);
		//   }
		//   printf(" cli_console_thread schedule priority is %d\n", param.sched_priority);

		/* FIXME: Possible race condition */
		if (conf_main_daemon) {
			stderr_is_open = 0;
			fclose(stderr);
		}

		if (conf_main_console) {
			pthread_join(cli_console_thread_tid, NULL);
		}
		else {
			// XXX: There must be a better way to do this!
			while (1) 
				pause();
		}
		tmexit();

}

/* A wrapper function to start a thread for each created
 * index object
 */
extern "C" {
void *start_index_thread(void *instance) {
//	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
	((IndexType *)(instance))->run();
	return NULL;
}
}
